Implement customizable task dispatch interceptor scheme#112
Open
conradbzura wants to merge 3 commits intomainfrom
Open
Implement customizable task dispatch interceptor scheme#112conradbzura wants to merge 3 commits intomainfrom
conradbzura wants to merge 3 commits intomainfrom
Conversation
Required by the new task dispatch interceptor scheme to bridge Wool interceptors to the gRPC async server interceptor interface.
Introduce a two-phase interceptor system for task and stream manipulation at the gRPC layer. Interceptors enable extensible processing pipelines for cross-cutting concerns like logging, authentication, and metrics without modifying core worker logic. InterceptorLike protocol defines the async generator contract: pre-dispatch task modification followed by response stream wrapping. The @interceptor decorator provides automatic global registration. WoolInterceptor bridges the Wool interceptor interface to gRPC's AsyncServerInterceptor. LocalWorker and WorkerProcess accept an optional interceptors list, falling back to globally registered interceptors when not specified.
Cover InterceptorLike protocol compliance, @interceptor decorator registration, WoolInterceptor gRPC bridge (dispatch-only filtering, task modification, stream wrapping, multi-interceptor chaining, error propagation), and updated public API surface assertions.
da28d48 to
7306c77
Compare
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary
Implement a customizable interceptor system for task dispatch that allows users to hook into the task lifecycle — modifying tasks before dispatch and wrapping response streams during execution. Interceptors follow an async generator protocol with three phases: pre-dispatch, stream processing, and cleanup.
Add an
InterceptorLikeprotocol, an@interceptordecorator for global registration, and aWoolInterceptorbridge that adapts Wool interceptors to gRPC's server interceptor interface. Integrate interceptor support intoLocalWorkerandWorkerProcess. Export new public API symbols fromwool/__init__.py.Closes #105
Proposed changes
1. Interceptor protocol and registration
Add
wool/src/wool/runtime/routine/interceptor.pywith:InterceptorLike— aProtocoldefining the async generator contract: yield a modifiedTask(orNone) pre-dispatch, receive the response stream, and yield wrapped events.@interceptor— decorator that appends to a global registry.get_registered_interceptors()— return a copy of the registry.2. gRPC bridge
Add
WoolInterceptor(AsyncServerInterceptor)in the same module. The bridge deserializes the task from the protobuf request, runs each interceptor's pre-dispatch phase in forward order, re-serializes if modified, calls the real dispatch method, then wraps the response stream through each interceptor in reverse order. Only apply todispatchRPC calls — other methods (e.g.,stop) bypass interception.3. Worker integration
LocalWorkeraccepts an optionalinterceptorsparameter; defaults to globally registered interceptors. Pass an empty list to disable.WorkerProcessreceives the interceptor list and creates aWoolInterceptorwhen starting the gRPC server.4. Public API and housekeeping
get_registered_interceptorsandinterceptorfromwool/__init__.py.grpc-interceptortopyproject.tomldependencies.Test cases
TestInterceptorDecorator@interceptorappliedinterceptor()TestInterceptorDecorator@interceptorappliedinterceptor()TestInterceptorDecorator@interceptorinterceptor()TestInterceptorDecorator@interceptorapplied twiceinterceptor()TestGetRegisteredInterceptorsget_registered_interceptors()calledget_registered_interceptors()TestGetRegisteredInterceptorsget_registered_interceptors()calledget_registered_interceptors()TestGetRegisteredInterceptorsget_registered_interceptors()calledget_registered_interceptors()TestGetRegisteredInterceptorsget_registered_interceptors()TestWoolInterceptorWoolInterceptorinstantiated__init__()TestWoolInterceptorWoolInterceptorinstantiated__init__()TestWoolInterceptorintercept()called for dispatchintercept()early exitTestWoolInterceptorintercept()called for non-dispatch methodintercept()bypassTestWoolInterceptorNoneintercept()called for dispatchintercept()passthroughTestWoolInterceptorintercept()called for dispatchintercept()task modificationTestWoolInterceptorintercept()called for dispatchintercept()orderingTestWoolInterceptorintercept()called for dispatchintercept()chained modificationTestWoolInterceptorintercept()called for dispatchintercept()stream wrappingTestWoolInterceptorintercept()called for dispatchintercept()stream filteringTestWoolInterceptorintercept()called for dispatchintercept()stream injectionTestWoolInterceptorintercept()called for dispatchintercept()reverse wrappingTestWoolInterceptorintercept()called for dispatchintercept()error handlingTestWoolInterceptorStopAsyncIterationpre-dispatchintercept()called for dispatchintercept()edge caseTestWoolInterceptorintercept()called for dispatchintercept()error handlingTestWoolInterceptorintercept()called for dispatchintercept()edge caseTestWoolInterceptorintercept()called for dispatchintercept()error handlingTestWoolInterceptorintercept()called for dispatchintercept()full lifecycleTestWoolInterceptorintercept()called for dispatchintercept()idempotencyTestWoolInterceptorintercept()preservationTestWoolInterceptorintercept()called twiceintercept()determinismTestWoolInterceptorintercept()called for dispatchintercept()universalityImplementation plan
InterceptorLikeprotocol,@interceptordecorator, andget_registered_interceptors()ininterceptor.pyWoolInterceptorwith pre-dispatch, stream-wrapping, and error-handling logictest_interceptor.py)LocalWorkerandWorkerProcesswool/__init__.pygrpc-interceptordependency topyproject.toml